package net.xdclass.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.extern.slf4j.Slf4j;
import net.xdclass.config.RabbitMQConfig;
import net.xdclass.constant.RedisKey;
import net.xdclass.controller.request.TrafficPageRequest;
import net.xdclass.controller.request.UseTrafficRequest;
import net.xdclass.enums.BizCodeEnum;
import net.xdclass.enums.EventMessageType;
import net.xdclass.enums.TaskStateEnum;
import net.xdclass.exception.BizException;
import net.xdclass.feign.ProductFeignService;
import net.xdclass.feign.ShortLinkFeignService;
import net.xdclass.interceptor.LoginInterceptor;
import net.xdclass.manager.TrafficManager;
import net.xdclass.manager.TrafficTaskManager;
import net.xdclass.model.EventMessage;
import net.xdclass.model.TrafficDO;
import net.xdclass.model.TrafficTaskDO;
import net.xdclass.service.TrafficService;
import net.xdclass.util.JsonData;
import net.xdclass.util.TimeUtil;
import net.xdclass.vo.ProductVO;
import net.xdclass.vo.TrafficVO;
import net.xdclass.vo.UseTrafficVO;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Service
@Slf4j
public class TrafficServiceImpl implements TrafficService {

    @Autowired
    private TrafficManager trafficManager;

    @Autowired
    private ProductFeignService productFeignService;

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private TrafficTaskManager trafficTaskManager;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitMQConfig rabbitMQConfig;

    @Autowired
    private ShortLinkFeignService shortLinkFeignService;

    @Override
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
    public void handleTrafficMessage(EventMessage eventMessage) {
        Long accountNo = eventMessage.getAccountNo();
        String messageType = eventMessage.getEventMessageType();
        if (EventMessageType.PRODUCT_ORDER_PAY.name().equalsIgnoreCase(messageType)) {
            //订单已支付，新增流量
            String content = eventMessage.getContent();
            JSONObject jsonObject = JSONObject.parseObject(content);
//            Map<String, Object> orderInfoMap = JsonUtil.json2Obj(content, Map.class);
            String outTradeNo = jsonObject.getString("outTradeNo");
            Integer buyNum = jsonObject.getInteger("buyNum");
            String product = jsonObject.getString("product");

            ProductVO productVO = JSON.parseObject(product, ProductVO.class);
            log.info("商品信息:{}", productVO);

            //流量包有效期
            LocalDateTime expiredDateTime = LocalDateTime.now().plusDays(productVO.getValidDay());
            Date date = Date.from(expiredDateTime.atZone(ZoneId.systemDefault()).toInstant());

            //构建流量包对象
            TrafficDO trafficDO = TrafficDO.builder()
                    .accountNo(accountNo)
                    .dayLimit(productVO.getDayTimes() * buyNum)
                    .dayUsed(0)
                    .totalLimit(productVO.getTotalTimes())
                    .pluginType(productVO.getPluginType())
                    .level(productVO.getLevel())
                    .productId(productVO.getId())
                    .outTradeNo(outTradeNo)
                    .expiredDate(date)
                    .build();

            int rows = trafficManager.add(trafficDO);
            log.info("消费消息新增流量包:rows={}", rows);

            //新增流量包，删除这个key
            String totalTrafficTimesKey = String.format(RedisKey.DAY_TOTAL_TRAFFIC, accountNo);
            redisTemplate.delete(totalTrafficTimesKey);

        } else if (EventMessageType.TRAFFIC_FREE_INIT.name().equalsIgnoreCase(messageType)) {
            //发放免费流量包
            Long productId = Long.valueOf(eventMessage.getBizId());

            JsonData jsonData = productFeignService.detail(productId);

            ProductVO productVO = jsonData.getData(new TypeReference<ProductVO>() {});

            TrafficDO trafficDO = TrafficDO.builder()
                    .accountNo(accountNo)
                    .dayLimit(productVO.getDayTimes())
                    .dayUsed(0)
                    .totalLimit(productVO.getTotalTimes())
                    .pluginType(productVO.getPluginType())
                    .level(productVO.getLevel())
                    .productId(productVO.getId())
                    .outTradeNo("free_init")
                    .expiredDate(new Date())
                    .build();
            trafficManager.add(trafficDO);
        } else if (EventMessageType.TRAFFIC_USED.name().equalsIgnoreCase(messageType)) {
            //流量包使用，检查是否成功使用
            //检查task是否存在
            //检查短链是否成功
            //如果不成功，则恢复流量包
            //删除task
            Long trafficTaskId = Long.valueOf(eventMessage.getBizId());
            TrafficTaskDO trafficTaskDO = trafficTaskManager.findByIdAndAccountNo(trafficTaskId, accountNo);

            if (trafficTaskDO != null && trafficTaskDO.getLockState().equalsIgnoreCase(TaskStateEnum.LOCK.name())) {

                JsonData jsonData = shortLinkFeignService.check(trafficTaskDO.getBizId());
                if (jsonData.getCode() != 0) {
                    log.error("创建短链失败，流量包回滚");
                    String today = TimeUtil.format(trafficTaskDO.getGmtCreate(), "yyyy-MM-dd");

                    trafficManager.releaseUsedTimes(accountNo, trafficTaskDO.getTrafficId(), 1, today);

                    //恢复流量包，删除key
                    String totalTrafficTimesKey = String.format(RedisKey.DAY_TOTAL_TRAFFIC, accountNo);
                    redisTemplate.delete(totalTrafficTimesKey);
                }
                trafficTaskManager.deleteByIdAndAccountNo(trafficTaskId, accountNo);
            }

        }
    }

    @Override
    public Map<String, Object> pageAvailable(TrafficPageRequest request) {
        int page = request.getPage();
        int size = request.getSize();
        Long accountNo = LoginInterceptor.threadLocal.get().getAccountNo();

        IPage<TrafficDO> trafficDOIPage = trafficManager.pageAvailable(page, size, accountNo);
        //流量包列表
        List<TrafficDO> records = trafficDOIPage.getRecords();
        List<TrafficVO> trafficVOList = records.stream().map(obj -> beanProcess(obj))
                .collect(Collectors.toList());

        Map<String, Object> pageMap = new HashMap<>();
        pageMap.put("total_record", trafficDOIPage.getTotal());
        pageMap.put("total_page", trafficDOIPage.getPages());
        pageMap.put("current_data", trafficVOList);
        return pageMap;
    }

    @Override
    public TrafficVO detail(long trafficId) {
        Long accountNo = LoginInterceptor.threadLocal.get().getAccountNo();

        TrafficDO trafficDO = trafficManager.findByIdAndAccountNo(trafficId, accountNo);

        return beanProcess(trafficDO);
    }

    @Override
    public boolean deleteExpireTraffic() {
        return trafficManager.deleteExpireTraffic();
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public JsonData reduce(UseTrafficRequest trafficRequest) {
        Long accountNo = trafficRequest.getAccountNo();
        //处理流量包，筛选出未更新流量包，当前使用的流量包
        UseTrafficVO useTrafficVO = processTrafficList(accountNo);
        log.info("今天可用总次数:{},当前使用流量包:{}", useTrafficVO.getDayTotalLeftTimes(), useTrafficVO.getDayTotalLeftTimes());
        if (useTrafficVO == null) {
            return JsonData.buildResult(BizCodeEnum.TRAFFIC_REDUCE_FAIL);
        }
        log.info("待更新流量包列表:{}", useTrafficVO.getUnUpdatedTrafficIds());
        if (useTrafficVO.getUnUpdatedTrafficIds().size()>0) {
            //更新今日流量包
            trafficManager.batchUpdateUsedTimes(accountNo, useTrafficVO.getUnUpdatedTrafficIds());
        }
        //先更新，在扣减
        int rows = trafficManager.addDayUsedTimes(accountNo, useTrafficVO.getCurrentTrafficDO().getId(), 1);

        TrafficTaskDO trafficTaskDO = TrafficTaskDO.builder().accountNo(accountNo).bizId(trafficRequest.getBizId())
                .useTimes(1).trafficId(useTrafficVO.getCurrentTrafficDO().getId())
                .lockState(TaskStateEnum.LOCK.name()).build();

        trafficTaskManager.add(trafficTaskDO);

        if (rows!=1) {
            throw new BizException(BizCodeEnum.TRAFFIC_REDUCE_FAIL);
        }
        //往redis设置下总流量包次数，短链服务那边递减即可，如果有新增流量包，则删除这个key
        Integer leftSeconds = TimeUtil.getRemainSecondsOneDay(new Date());
        String totalTrafficTimesKey = String.format(RedisKey.DAY_TOTAL_TRAFFIC, accountNo);
        redisTemplate.opsForValue().set(totalTrafficTimesKey, useTrafficVO.getDayTotalLeftTimes()-1,leftSeconds, TimeUnit.SECONDS);

        EventMessage eventMessage = EventMessage.builder().accountNo(accountNo)
                .bizId(trafficTaskDO.getId() + "")
                .eventMessageType(EventMessageType.TRAFFIC_USED.name())
                .build();

        //发送延迟消息，用于异常回滚
        rabbitTemplate.convertAndSend(rabbitMQConfig.getTrafficEventExchange(),
                rabbitMQConfig.getTrafficReleaseRoutingKey(), eventMessage);

        return JsonData.buildSuccess();
    }

    private UseTrafficVO processTrafficList(Long accountNo) {
        List<TrafficDO> list = trafficManager.selectAvailableTraffic(accountNo);
        if (list == null || list.size()==0) {
            throw new BizException(BizCodeEnum.TRAFFIC_EXCEPTION);
        }
        //天剩余可用总次数
        Integer dayTotalLeftTimes = 0;

        //当前使用
        TrafficDO currentTrafficDO = null;

        //没过期，但是今天没更新的流量包id列表
        List<Long> unUpdatedTrafficIds = new ArrayList<>();

        //今天日期
        String todayStr = TimeUtil.format(new Date(), "yyyy-MM-dd");

        for (TrafficDO trafficDO : list) {
            String updateDate = TimeUtil.format(trafficDO.getGmtModified(), "yyyy-MM-dd");
            if (todayStr.equalsIgnoreCase(updateDate)) {
                //已经更新 天剩余可用总次数 = 总次数 - 已用
                int dayLeftTimes = trafficDO.getDayLimit() - trafficDO.getDayUsed();
                dayTotalLeftTimes = dayTotalLeftTimes + dayLeftTimes;

                //选取当次使用流量包
                if (dayLeftTimes>0 && currentTrafficDO==null) {
                    currentTrafficDO = trafficDO;
                }
            } else {
                //未更新
                dayTotalLeftTimes = dayTotalLeftTimes + trafficDO.getDayLimit();
                // 记录未更新的流量包
                unUpdatedTrafficIds.add(trafficDO.getId());

                //选取当次使用流量包
                if (currentTrafficDO == null) {
                    currentTrafficDO = trafficDO;
                }
            }
        }

        UseTrafficVO useTrafficVO = new UseTrafficVO(dayTotalLeftTimes, currentTrafficDO, unUpdatedTrafficIds);
        return useTrafficVO;
    }

    private TrafficVO beanProcess(TrafficDO trafficDO) {
        TrafficVO trafficVO = new TrafficVO();
        BeanUtils.copyProperties(trafficDO, trafficVO);
        return trafficVO;
    }
}
